Hive SQL on Flink 构建流批一体引擎
1. 构建流批一体引擎的挑战 2. Hive SQL on Flink 3. 流批一体引擎的收益 4. Demo 5. 未来展望
01
构建流批一体引擎的挑战
应用层的对接。在流批割裂的环境下,应用层仍然是有不同的提交平台,如何保证原来的应用层能无损且直接地对接到新的 SQL Gateway 上,是一个巨大的难点。
用户作业迁移的成本。用户原来的 Batch 作业是用 Hive SQL 进行撰写的,现在则需要替换成 Flink SQL。为了保证用户的作业能无损迁上来,我们需要解决语言上的兼容和用户所用的 UDF 的兼容。
Flink 对 Hive SQL 的兼容,我们在 1.16 中大大提升了对 Hive SQL 本身的兼容性。
我们在 Flink 社区引入了 SQL Gateway,从而兼容 Hive 的生态。
02
Hive SQL on Flink
2.1 Hive SQL on Flink 的具体工作
支持 Hive MetaStore 作为 Flink 的 Catalog,Hive 已有的表可自动注册进 Flink 中,用户无需再定义各种 DDL 来映射底层的 Hive 表。
支持 Hive MetaStore 存储 Flink 定义的 Hive 表/ 非 Hive 表。
支持从 Hive MetaStore 获得表的统计信息,从而优化查询的执行计划,提升端到端 SQL 的性能。
Hive 提供了非常丰富的 UDF,在 Flink 中我们可以直接调用 Hive 中内置的 UDF。换句话说,用户使用 Flink 就能享受到 Hive 那套内置 UDF 所带来的方便及易用性。
支持调用自定义的 Hive UDF。对于熟悉 Hive 的人,他们会基于 Hive UDF 的接口去定义自己的 UDF。但如果他们想用 Flink,又不想废弃那些 UDF,更不想重写。要怎么办呢?其实 Flink 支持调用用户自定义的 Hive UDF,所以用户不需要对 UDF 做任何重写的工作,这极大的方便了用户的操作。
支持流读/批读/流写/批写 Hive 表。
批读 Hive 表支持静态分区裁剪和动态分区裁剪。可以大幅削减读取数据的规模,从而提升读的性能和效率。
批读 Hive 表支持并发推断。在批场景下,并发设置是一个比较难的问题,但如果在批读 Hive 场景下,我们可以通过 Hive 表的文件信息推断出合理的并发,从而提升端到端链路的性能。
批写/流写 Hive 支持自定义分区提交策略。在批调度链路里,我们可能会把先提交分区,然后触发一些其他下游的操作或调度,这时我们无需引入其他额外的组件,直接在 Flink 里自定义这些分区提交的策略即可。比如指定分区提交后,触发定时任务或者在消息队列插一条数据等等。
流写 Hive 表支持小文件自动合并。在流的场景下,会生成很多小文件,但在流写 Hive 表时,我们支持小文件的自动合并,通过将小文件合并成更大的文件,减少了小文件的数量,从而缓解 HDFS 集群的压力。
批写 Hive 表支持自动收集统计信息,这一部分完全兼容了 Hive 的行为。在使用 Hive 写 Hive 表的时候,它会收集统计信息并提交到 MetaStore。我们用 Flink 写 Hive 表的时候,也能支持将统计信息提交到 MetaStore,包括文件的大小、数据的条数等等。
2.2 Flink 兼容 Hive SQL 的架构
2.3 Flink 对 Hive SQL 的兼容
支持 distribute by/sort by/ cluster by。
支持 multi insert。一个 scan 可以插入到多个不同数据的 sink 端,极大的提高了数据 ETL 链路的效率。
支持 insert directory。
支持 load data。
支持 create function using jar。
……
基于 Hive 2.3 的 qtest 测试集,12000 条 DQL/DML 都扔到 Flink 去执行,这些 SQL 都能够被正常执行。
12000 条 DQL/DML 也包含了很多对 ACID 表的查询。Hive 的 ACID 表在生产中用的较少,如果我们除去针对 ACID 表的 DQL/DML,兼容度可达 97%。
2.4 Flink 对 Hive 生态的兼容
2.5 引入 Flink SQL Gateway 的原因
目前 Flink 社区官方提供了 SQL Client 供用户提交 SQL 作业。但由于 SQL Client 本身没有服务化,用户往往需要基于 SQL Client 做一层封装,添加一个服务化的前端。通过该服务化的前端,用户的 SQL 作业最终会被提交给 SQL Client 去执行。以上的过程比较繁琐而且开发成本较大,因此,我们在社区提供了一个默认的服务化的实现,降低用户的使用成本。
以上的方案是基于 SQL Client 来做的作业提交,但这套 API 并不稳定。而引入的 SQL Gateway 则提供了稳定的 API。
相比于 SQL Client, SQL Gateway 是 C/S 架构,更容易对接诸多生态 ,e.g. HiveServer2。
开箱即用,用户可以直接使用 SQL Gateway 搭建一个生产可用的提交工具。
生态对接,提供了稳定的 API,方便 Flink 对接其它生态工具。
兼容 HiveServer2 协议,提供了 HiveServer2 Endpoint 以兼容 Hive 生态。
2.6 Flink SQL Gateway 架构
REST Endpoint:用户可以通过 REST 工具提交作业。
HiveServer2 Endpoint:通过它我们就能提供对接 Hive 主流生态的能力。
2.7 HiveServer2 Endpoint
03
流批一体引擎的收益
3.1 Hive SQL on Flink 构建流批一体引擎
第一,统一流批引擎。降低维护成本,提升研发的效率。因为我们现在就一套引擎了,所以维护成本会非常低。
第二,流批一体数仓。我们通过流批一体引擎构建出了流批一体 SQL 层。借此,我们可以把流批一体的存储考虑进来,构建完整的流批一体数仓架构。
第三,Hive SQL 实时化。目前 Hive SQL 主要还是跑在批引擎上,每天做一次调度,产生结果。如果把 Hive SQL 迁移到 Flink 中,我们就可以很方便的将它实时化改造。只要把引擎模式设置成流模式,就可以将其实时化,数仓实时化改造的成本非常低。
第四,OLAP & 联邦查询。我们可以基于 Flink + Hive SQL 搭建 OLAP 系统。借助 Flink 对各种数据源的支持,以及对 Hive SQL 稍微进行扩展就可以实现联邦查询。
3.2 基于 Hive 语法进行联邦查询
04
Demo
实时的 pipeline,我们往往通过 Flink 将 Kafka 的数据进行打宽聚合写入下游,并通过 Flink 写入 HDFS 的最终表。
离线的 pipeline,我们则可以通过周期性地调度 Flink 作业将数据写入到 HDFS 中。为了保持数据的正确性,在 Lambda 架构之中往往通过将批的结果回刷到 HDFS 中,保证数据的正确性。
05
未来展望
优化读各种格式的文件,包括对读 Parquet 文件的嵌套列 PushDown、FilterPushDown 的优化等,从而提升性能。
提升写 Hive 端到端的生产可用性。比如,批模式下解决小文件多的问题。
根据用户的反馈不断加强 Hive 的语法支持。
SQL Client 支持向 SQL Gateway 提交 SQL,保证功能完整性。
补全认证功能,保证 SQL Gateway 基本生产可用。
基于 SQL Gateway 对接更多生态工具,增强 SQL Gateway 的应用范围。
往期精选
▼ 登录「Flink-learning 学训平台」,加入学习 ▼